home *** CD-ROM | disk | FTP | other *** search
Wrap
# Source Generated with Decompyle++ # File: in.pyc (Python 2.6) import cups import pprint import os import tempfile import re import string import locale from import _debugprint class Printer: _flags_blacklist = [ 'options', 'local'] def __init__(self, name, connection, **kw): ''' @param name: printer name @type name: string @param connection: CUPS connection @type connection: CUPS.Connection object @param kw: printer attributes @type kw: dict indexed by string ''' self.name = name self.connection = connection self.class_members = [] self.update(**kw) self._ppd = None def __del__(self): if self._ppd != None: os.unlink(self._ppd) def __repr__(self): return '<cupshelpers.Printer "%s">' % self.name def _expand_flags(self): def _ascii_lower(str): return str.translate(string.maketrans(string.ascii_uppercase, string.ascii_lowercase)) prefix = 'CUPS_PRINTER_' prefix_length = len(prefix) for name in cups.__dict__: if name.startswith(prefix): attr_name = _ascii_lower(name[prefix_length:]) if attr_name in self._flags_blacklist: continue if attr_name == 'class': attr_name = 'is_class' setattr(self, attr_name, bool(self.type & getattr(cups, name))) continue def update(self, **kw): ''' Update object from printer attributes. @param kw: printer attributes @type kw: dict indexed by string ''' self.state = kw.get('printer-state', 0) self.enabled = self.state != cups.IPP_PRINTER_STOPPED self.device_uri = kw.get('device-uri', '') self.info = kw.get('printer-info', '') self.is_shared = kw.get('printer-is-shared', None) self.location = kw.get('printer-location', '') self.make_and_model = kw.get('printer-make-and-model', '') self.type = kw.get('printer-type', 0) self.uri_supported = kw.get('printer-uri-supported', '') if type(self.uri_supported) != list: self.uri_supported = [ self.uri_supported] self._expand_flags() if self.is_shared is None: self.is_shared = not (self.not_shared) del self.not_shared def getAttributes(self): ''' Fetch further attributes for the printer. Normally only a small set of attributes is fetched. This method is for fetching more. ''' attrs = self.connection.getPrinterAttributes(self.name) self.attributes = { } self.other_attributes = { } self.possible_attributes = { 'landscape': ('False', [ 'True', 'False']), 'page-border': ('none', [ 'none', 'single', 'single-thick', 'double', 'double-thick']) } for key, value in attrs.iteritems(): if key.endswith('-default'): name = key[:-len('-default')] if name in ('job-sheets', 'printer-error-policy', 'printer-op-policy', 'notify-events', 'document-format', 'notify-lease-duration'): continue if not attrs.get(name + '-supported', None) and self.possible_attributes.get(name, None): pass supported = '' if isinstance(value, list): value = reduce((lambda x, y: x + ',' + y), value) self.attributes[name] = value if attrs.has_key(name + '-supported'): supported = attrs[name + '-supported'] self.possible_attributes[name] = (value, supported) attrs.has_key(name + '-supported') if not key.endswith('-supported') and key != 'job-sheets-default' and key != 'printer-error-policy' and key != 'printer-op-policy' and not key.startswith('requesting-user-name-'): self.other_attributes[key] = value continue (self.job_sheet_start, self.job_sheet_end) = attrs.get('job-sheets-default', ('none', 'none')) self.job_sheets_supported = attrs.get('job-sheets-supported', [ 'none']) self.error_policy = attrs.get('printer-error-policy', 'none') self.error_policy_supported = attrs.get('printer-error-policy-supported', [ 'none']) if not attrs.get('printer-op-policy', ''): pass self.op_policy = 'default' self.op_policy_supported = attrs.get('printer-op-policy-supported', [ 'default']) self.default_allow = True self.except_users = [] if attrs.has_key('requesting-user-name-allowed'): self.except_users = attrs['requesting-user-name-allowed'] self.default_allow = False elif attrs.has_key('requesting-user-name-denied'): self.except_users = attrs['requesting-user-name-denied'] self.except_users_string = ', '.join(self.except_users) if attrs.has_key('device-uri') and attrs['device-uri'].startswith('smb:'): attrs['device-uri'] = self.device_uri self.update(**attrs) def getServer(self): ''' Find out which server defines this printer. @returns: server URI or None ''' if not self.uri_supported[0].startswith('ipp://'): return None uri = self.uri_supported[0][6:] uri = uri.split('/')[0] uri = uri.split(':')[0] if uri == 'localhost.localdomain': uri = 'localhost' return uri def getPPD(self): """ Obtain the printer's PPD. @returns: cups.PPD object, or False for raw queues @raise cups.IPPError: IPP error """ result = None if self._ppd is None: try: self._ppd = self.connection.getPPD(self.name) result = cups.PPD(self._ppd) except cups.IPPError: (e, m) = None if e == cups.IPP_NOT_FOUND: result = False else: raise e == cups.IPP_NOT_FOUND None<EXCEPTION MATCH>cups.IPPError if result != False and self._ppd != None: result = cups.PPD(self._ppd) return result def setOption(self, name, value): """ Set a printer's option. @param name: option name @type name: string @param value: option value @type value: option-specific """ if isinstance(value, float): radixchar = locale.nl_langinfo(locale.RADIXCHAR) if radixchar != '.': value = str(value).replace(radixchar, '.') self.connection.addPrinterOptionDefault(self.name, name, value) def unsetOption(self, name): """ Unset a printer's option. @param name: option name @type name: string """ self.connection.deletePrinterOptionDefault(self.name, name) def setEnabled(self, on, reason = None): """ Set the printer's enabled state. @param on: whether it will be enabled @type on: bool @param reason: reason for this state @type reason: string """ if on: self.connection.enablePrinter(self.name) elif reason: self.connection.disablePrinter(self.name, reason = reason) else: self.connection.disablePrinter(self.name) def setAccepting(self, on, reason = None): """ Set the printer's accepting state. @param on: whether it will be accepting @type on: bool @param reason: reason for this state @type reason: string """ if on: self.connection.acceptJobs(self.name) elif reason: self.connection.rejectJobs(self.name, reason = reason) else: self.connection.rejectJobs(self.name) def setShared(self, on): """ Set the printer's shared state. @param on: whether it will be accepting @type on: bool """ self.connection.setPrinterShared(self.name, on) def setErrorPolicy(self, policy): """ Set the printer's error policy. @param policy: error policy @type policy: string """ self.connection.setPrinterErrorPolicy(self.name, policy) def setOperationPolicy(self, policy): """ Set the printer's operation policy. @param policy: operation policy @type policy: string """ self.connection.setPrinterOpPolicy(self.name, policy) def setJobSheets(self, start, end): """ Set the printer's job sheets. @param start: start sheet @type start: string @param end: end sheet @type end: string """ self.connection.setPrinterJobSheets(self.name, start, end) def setAccess(self, allow, except_users): ''' Set access control list. @param allow: whether to allow by default, otherwise deny @type allow: bool @param except_users: exception list @type except_users: string list ''' pass def jobsQueued(self, only_tests = False): ''' Find out whether jobs are queued for this printer. @param only_tests: whether to restrict search to test pages @type only_tests: bool @returns: list of job IDs ''' ret = [] try: jobs = self.connection.getJobs() except cups.IPPError: return ret for id, attrs in jobs.iteritems(): try: uri = attrs['job-printer-uri'] uri = uri[uri.rindex('/') + 1:] except: continue if uri != self.name: continue if (not only_tests or attrs.has_key('job-name')) and attrs['job-name'] == 'Test Page': ret.append(id) continue return ret def testsQueued(self): ''' Find out whether test jobs are queued for this printer. @returns: list of job IDs ''' return self.jobsQueued(only_tests = True) def setAsDefault(self): ''' Set this printer as the system default. ''' self.connection.setDefault(self.name) (tmpfd, tmpfname) = tempfile.mkstemp() os.remove(tmpfname) try: resource = '/admin/conf/lpoptions' self.connection.getFile(resource, fd = tmpfd) except cups.HTTPError: (s,) = None if s == cups.HTTP_NOT_FOUND: return False raise cups.HTTPError(s) except: s == cups.HTTP_NOT_FOUND f = os.fdopen(tmpfd, 'r+') f.seek(0) lines = f.readlines() changed = False i = 0 for line in lines: if line.startswith('Default '): name = line.split(' ')[1] if name != self.name: lines[i] = 'Dest ' + line[8:] changed = True i += 1 continue if changed: f.seek(0) f.writelines(lines) f.truncate() os.lseek(tmpfd, 0, os.SEEK_SET) try: self.connection.putFile(resource, fd = tmpfd) except cups.HTTPError: (s,) = None return False None<EXCEPTION MATCH>cups.HTTPError return changed def getPrinters(connection): ''' Obtain a list of printers. @param connection: CUPS connection @type connection: CUPS.Connection object @returns: L{Printer} list ''' printers = connection.getPrinters() classes = connection.getClasses() for name, printer in printers.iteritems(): printer = Printer(name, connection, **printer) printers[name] = printer if classes.has_key(name): printer.class_members = classes[name] printer.class_members.sort() continue return printers def parseDeviceID(id): ''' Parse an IEEE 1284 Device ID, so that it may be indexed by field name. @param id: IEEE 1284 Device ID, without the two leading length bytes @type id: string @returns: dict indexed by field name ''' id_dict = { } pieces = id.split(';') for piece in pieces: if piece.find(':') == -1: continue (name, value) = piece.split(':', 1) id_dict[name] = value if id_dict.has_key('MANUFACTURER'): id_dict.setdefault('MFG', id_dict['MANUFACTURER']) if id_dict.has_key('MODEL'): id_dict.setdefault('MDL', id_dict['MODEL']) if id_dict.has_key('COMMAND SET'): id_dict.setdefault('CMD', id_dict['COMMAND SET']) for name in [ 'MFG', 'MDL', 'CMD', 'CLS', 'DES', 'SN', 'S', 'P', 'J']: id_dict.setdefault(name, '') id_dict['CMD'] = id_dict['CMD'].split(',') return id_dict class Device: ''' This class represents a CUPS device. ''' def __init__(self, uri, **kw): ''' @param uri: device URI @type uri: string @param kw: device attributes @type kw: dict ''' self.uri = uri self.device_class = kw.get('device-class', 'Unknown') self.info = kw.get('device-info', '') self.make_and_model = kw.get('device-make-and-model', 'Unknown') self.id = kw.get('device-id', '') uri_pieces = uri.split(':') self.type = uri_pieces[0] self.is_class = len(uri_pieces) == 1 self.id_dict = parseDeviceID(self.id) def __repr__(self): return '<cupshelpers.Device "%s">' % self.uri def __cmp__(self, other): ''' Compare devices by order of preference. ''' if other == None: return -1 if self.is_class != other.is_class: if other.is_class: return -1 return 1 result = cmp(bool(self.id), bool(other.id)) return result def getDevices(connection): ''' Obtain a list of available CUPS devices. @param connection: CUPS connection @type connection: cups.Connection object @returns: a list of L{Device} objects @raise cups.IPPError: IPP Error ''' devices = connection.getDevices() for uri, data in devices.iteritems(): device = Device(uri, **data) devices[uri] = device if device.info != 'Unknown' and device.make_and_model == 'Unknown': device.make_and_model = device.info continue return devices def activateNewPrinter(connection, name): ''' Set a new printer enabled, accepting jobs, and (if necessary) the default printer. @param connection: CUPS connection @type connection: cups.Connection object @param name: printer name @type name: string @raise cups.IPPError: IPP error ''' connection.enablePrinter(name) connection.acceptJobs(name) if connection.getDefault() == None: connection.setDefault(name) def copyPPDOptions(ppd1, ppd2): ''' Copy default options between PPDs. @param ppd1: source PPD @type ppd1: cups.PPD object @param ppd2: destination PPD @type ppd2: cups.PPD object ''' def getPPDGroupOptions(group): options = group.options[:] for g in group.subgroups: options.extend(getPPDGroupOptions(g)) return options def iteratePPDOptions(ppd): for group in ppd.optionGroups: for option in getPPDGroupOptions(group): yield option for option in iteratePPDOptions(ppd1): if option.keyword == 'PageRegion': continue new_option = ppd2.findOption(option.keyword) if new_option and option.ui == new_option.ui: value = option.defchoice for choice in new_option.choices: if choice['choice'] == value: ppd2.markOption(new_option.keyword, value) _debugprint('set %s = %s' % (new_option.keyword, value)) continue def setPPDPageSize(ppd, language): ''' Set the PPD page size according to locale. @param ppd: PPD @type ppd: cups.PPD object @param language: language, as given by the first element of locale.setlocale @type language: string ''' size = 'A4' letter = [ 'C', 'POSIX', 'en', 'en_US', 'en_CA', 'fr_CA'] for each in letter: if language == each: size = 'Letter' continue try: ppd.markOption('PageSize', size) _debugprint('set PageSize = %s' % size) except: _debugprint('Failed to set PageSize (%s not available?)' % size) def missingPackagesAndExecutables(ppd): ''' Check that all relevant executables for a PPD are installed. @param ppd: PPD @type ppd: cups.PPD object @returns: string list pair, representing missing packages and missing executables ''' def pathcheck(name, path = '/usr/bin:/bin'): p = name.find('%') if p != -1: name = name[:p] if len(name) == 0: return 'true' if name[0] == '/': if os.access(name, os.X_OK): _debugprint('%s: found' % name) return name _debugprint('%s: NOT found' % name) return None name[0] == '/' if name.find('=') != -1: return 'builtin' if name in (':', '.', '[', 'alias', 'bind', 'break', 'cd', 'continue', 'declare', 'echo', 'else', 'eval', 'exec', 'exit', 'export', 'fi', 'if', 'kill', 'let', 'local', 'popd', 'printf', 'pushd', 'pwd', 'read', 'readonly', 'set', 'shift', 'shopt', 'source', 'test', 'then', 'trap', 'type', 'ulimit', 'umask', 'unalias', 'unset', 'wait'): return 'builtin' for component in path.split(':'): file = component.rstrip(os.path.sep) + os.path.sep + name if os.access(file, os.X_OK): _debugprint('%s: found' % file) return file _debugprint('%s: NOT found in %s' % (name, path)) pkgs_to_install = [] exes_to_install = [] exe = None exepath = None attr = ppd.findAttr('FoomaticRIPCommandLine') if attr: cmdline = attr.value.replace('&&\n', '') cmdline = cmdline.replace('"', '"') cmdline = cmdline.replace('<', '<') cmdline = cmdline.replace('>', '>') if cmdline.find('(') != -1 or cmdline.find('&') != -1: cmdline = '' pipes = cmdline.split(';') for pipe in pipes: cmds = pipe.strip().split('|') for cmd in cmds: args = cmd.strip().split(' ') exe = args[0] exepath = pathcheck(exe) if not exepath: break if os.path.basename(exepath) == 'gs': argn = len(args) argi = 1 search = '-sIjsServer=' while argi < argn: arg = args[argi] if arg.startswith(search): exe = arg[len(search):] exepath = pathcheck(exe) break argi += 1 continue if not exepath: break continue if exepath or not exe: (tmpfd, tmpfname) = tempfile.mkstemp() os.unlink(tmpfname) ppd.writeFd(tmpfd) os.lseek(tmpfd, 0, os.SEEK_SET) f = os.fdopen(tmpfd, 'r') search = '*cupsFilter:' for line in f.readlines(): if line.startswith(search): line = line[len(search):].strip().strip('"') try: (mimetype, cost, exe) = line.split(' ') except: continue exepath = pathcheck(exe, '/usr/lib/cups/filter:/usr/lib64/cups/filter') continue if exe and not exepath: p = exe.find('%') if p != -1: exe = exe[:p] pkgs = { 'gs': 'ghostscript', 'perl': 'perl', 'foo2oak-wrapper': None, 'pnm2ppa': 'pnm2ppa', 'c2050': 'c2050', 'c2070': 'c2070', 'cjet': 'cjet', 'lm1100': 'lx', 'esc-m': 'min12xxw', 'min12xxw': 'min12xxw', 'pbm2l2030': 'pbm2l2030', 'pbm2l7k': 'pbm2l7k', 'pbm2lex': 'pbm2l7k', 'hpijs': 'hpijs', 'ijsgutenprint.5.0': 'gutenprint', 'rastertogutenprint.5.0': 'gutenprint-cups', 'commandtoepson': 'gutenprint-cups', 'commandtocanon': 'gutenprint-cups' } try: pkg = pkgs[exe] except: pkg = None if pkg: _debugprint('%s included in package %s' % (exe, pkg)) pkgs_to_install.append(pkg) else: exes_to_install.append(exe) return (pkgs_to_install, exes_to_install) def _main(): c = cups.Connection() for device in getDevices(c).itervalues(): print device.uri, device.id_dict if __name__ == '__main__': _main()